跳到主要内容

Canal 同步数据库

1. 什么是 Canal?它的核心作用是什么?

考察点: 数据库同步工具理解、中间件基础知识

Canal 是阿里开源的基于 MySQL 数据库 binlog 的增量订阅&消费组件。其核心作用是:

  • 数据库镜像:实时同步数据库变更
  • 数据库实时备份:基于 binlog 的增量备份
  • 索引构建和实时维护:如 Elasticsearch 索引更新
  • 业务缓存刷新:如 Redis 缓存同步
  • 增量数据处理:带业务逻辑的数据变更处理

Canal 的工作原理:

  1. Canal 模拟 MySQL slave 的交互协议
  2. 向 MySQL Master 发送 dump 协议
  3. MySQL Master 推送 binary log 给 Canal
  4. Canal 解析 binary log 并发送到目标存储

2. Canal 的架构设计和工作流程是怎样的?

考察点: 系统架构设计、分布式系统理解

关键组件说明:

  • Canal Server:负责连接 MySQL,解析 binlog
  • Canal Client:消费 Canal Server 的数据,执行业务逻辑
  • Binary Log:MySQL 的二进制日志,记录所有数据变更

3. 在 Go 中如何实现 Canal 客户端?请设计一个基本的架构

考察点: Go 语言实践、设计模式、错误处理

package main

import (
"context"
"log"
"time"

"github.com/withlin/canal-go/client"
"github.com/withlin/canal-go/protocol"
)

type CanalClient struct {
client *client.SimpleCanalConnector
handlers map[string]DataHandler
}

type DataHandler interface {
Handle(entry *protocol.Entry) error
}

type UserDataHandler struct{}

func (h *UserDataHandler) Handle(entry *protocol.Entry) error {
// 处理用户表数据变更
log.Printf("处理用户数据: %v", entry)
return nil
}

func NewCanalClient(addr string, port int, username, password, destination string) *CanalClient {
connector := client.NewSimpleCanalConnector(addr, port, username, password, destination, 60000, 60*60*1000)

return &CanalClient{
client: connector,
handlers: make(map[string]DataHandler),
}
}

func (c *CanalClient) RegisterHandler(tableName string, handler DataHandler) {
c.handlers[tableName] = handler
}

func (c *CanalClient) Start(ctx context.Context) error {
err := c.client.Connect()
if err != nil {
return err
}
defer c.client.DisConnect()

// 订阅所有表
err = c.client.Subscribe(".*\\..*")
if err != nil {
return err
}

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
message, err := c.client.Get(100, nil, nil)
if err != nil {
log.Printf("获取数据失败: %v", err)
time.Sleep(time.Second)
continue
}

if message.Id == -1 || len(message.Entries) == 0 {
time.Sleep(time.Second)
continue
}

c.processEntries(message.Entries)
c.client.Ack(message.Id)
}
}
}

func (c *CanalClient) processEntries(entries []*protocol.Entry) {
for _, entry := range entries {
if entry.GetEntryType() == protocol.EntryType_TRANSACTIONBEGIN ||
entry.GetEntryType() == protocol.EntryType_TRANSACTIONEND {
continue
}

tableName := entry.GetHeader().GetTableName()
if handler, exists := c.handlers[tableName]; exists {
if err := handler.Handle(entry); err != nil {
log.Printf("处理数据失败: %v", err)
}
}
}
}

4. Canal 的数据一致性如何保证?如果出现数据丢失怎么处理?

考察点: 数据一致性、容错机制、监控告警

数据一致性保证机制:

Go 实现示例:

type ReliableProcessor struct {
maxRetries int
retryDelay time.Duration
deadQueue chan *protocol.Entry
}

func (p *ReliableProcessor) ProcessWithRetry(entry *protocol.Entry, handler DataHandler) error {
var lastErr error

for i := 0; i < p.maxRetries; i++ {
if err := handler.Handle(entry); err != nil {
lastErr = err
log.Printf("处理失败,第 %d 次重试: %v", i+1, err)
time.Sleep(p.retryDelay * time.Duration(i+1)) // 指数退避
continue
}
return nil // 成功
}

// 重试耗尽,放入死信队列
select {
case p.deadQueue <- entry:
log.Printf("数据放入死信队列: %v", entry)
default:
log.Printf("死信队列已满,数据丢失: %v", entry)
}

return lastErr
}

5. 如何处理 Canal 的高可用和扩展性问题?

考察点: 高可用架构、负载均衡、分布式系统

Go 实现高可用客户端:

type HAClient struct {
servers []ServerConfig
current int
client *CanalClient
mutex sync.RWMutex
}

type ServerConfig struct {
Host string
Port int
}

func (ha *HAClient) connectWithFailover() error {
ha.mutex.Lock()
defer ha.mutex.Unlock()

for i := 0; i < len(ha.servers); i++ {
server := ha.servers[ha.current]

client := NewCanalClient(server.Host, server.Port, "canal", "canal", "example")
if err := client.Connect(); err != nil {
log.Printf("连接服务器 %s:%d 失败: %v", server.Host, server.Port, err)
ha.current = (ha.current + 1) % len(ha.servers)
continue
}

ha.client = client
return nil
}

return errors.New("所有 Canal 服务器都不可用")
}

6. Canal 在生产环境中的监控指标有哪些?如何实现?

考察点: 生产环境运维、监控体系、性能调优

关键监控指标:

type CanalMetrics struct {
// 连接状态
ConnectionStatus prometheus.Gauge

// 处理性能
ProcessedMessages prometheus.Counter
ProcessingDuration prometheus.Histogram

// 错误统计
ProcessingErrors prometheus.Counter
RetryCount prometheus.Counter

// 队列状态
QueueSize prometheus.Gauge
DeadLetterQueueSize prometheus.Gauge

// 延迟监控
ProcessingLag prometheus.Gauge
}

func NewCanalMetrics() *CanalMetrics {
return &CanalMetrics{
ConnectionStatus: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "canal_connection_status",
Help: "Canal connection status (1=connected, 0=disconnected)",
}),
ProcessedMessages: prometheus.NewCounter(prometheus.CounterOpts{
Name: "canal_processed_messages_total",
Help: "Total number of processed messages",
}),
// ... 其他指标初始化
}
}

// 监控处理函数
func (c *CanalClient) ProcessWithMetrics(entry *protocol.Entry) error {
start := time.Now()
defer func() {
c.metrics.ProcessingDuration.Observe(time.Since(start).Seconds())
}()

if err := c.processEntry(entry); err != nil {
c.metrics.ProcessingErrors.Inc()
return err
}

c.metrics.ProcessedMessages.Inc()
return nil
}

7. 原有问题保留:如何配置 MySQL 以支持 Canal?

MySQL 配置要求:

[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog_format=ROW
# 配置 server-id(不能与 Canal 的 slaveId 重复)
server_id=12345

用户权限配置:

-- 创建 Canal 用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

-- 授予必要权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

-- 验证配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

这些面试题涵盖了 Canal 的核心概念、架构设计、Go 语言实现、高可用方案、监控运维等多个方面,既保留了原有的技术细节,又增加了大厂常见的系统设计和工程实践问题。

Reference